iT邦幫忙

2025 iThome 鐵人賽

DAY 21
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 21

Day 21: Join 算子 Part 1 - Hash Join 原理

  • 分享至 

  • xImage
  •  

前言

在前兩天的文章中,我們探討了聚合算子的兩種策略:Hash Aggregation 與 Sort-based Aggregation 並觀察 DataFusion 如何選擇最佳的聚合策略,提升分散式處理效能。

不過聚合操作是針對對單個表的數據進行分組和計算,而今天我們要探討的主題——Join(連接)操作,則是關於如何高效地組合多個表的數據。Join 是 SQL 中最常見也最重要的操作之一,幾乎每個複雜查詢都會涉及到它:

-- 電商場景:組合訂單、用戶和產品資訊
SELECT 
    o.order_id,
    u.user_name,
    p.product_name,
    o.quantity,
    o.amount
FROM orders o
JOIN users u ON o.user_id = u.user_id
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2024-01-01';

這個查詢需要將三個表的數據關聯起來。如果 orders 表有 1000 萬行、users 表有 100 萬行、products 表有 10 萬行,如何高效地完成這個連接操作?

答案就在於 Hash Join——現代資料庫系統中最常用的 Join 算法之一。

今天的學習目標:

  1. 理解 Hash Join 的核心概念和工作原理
  2. 掌握 Build Phase(構建階段)和 Probe Phase(探測階段)的詳細流程
  3. 了解 Build 側和 Probe 側的選擇策略
  4. 認識 Hash Table 的構建方式
  5. 理解不同 Join 類型(Inner、Left/Right Outer、Full Outer)的處理差異

Hash Join 的核心概念

什麼是 Join 操作?

在深入 Hash Join 之前,讓我們先回顧 Join 的本質。Join 操作是根據**連接條件(Join Condition)**來組合兩個表的行:

左表(Left Table):              右表(Right Table):
user_id | name                  user_id | city
--------+-------                --------+----------
1       | Alice                 1       | Taipei
2       | Bob                   2       | Taichung
3       | Carol                 4       | Kaohsiung

JOIN 條件: left.user_id = right.user_id

INNER JOIN 結果:
user_id | name  | city
--------+-------+----------
1       | Alice | Taipei
2       | Bob   | Taichung

LEFT OUTER JOIN 結果:
user_id | name  | city
--------+-------+----------
1       | Alice | Taipei
2       | Bob   | Taichung
3       | Carol | NULL

Join 的挑戰在於如何高效地找到匹配的行。如果使用最簡單的嵌套循環(Nested Loop):

// 嵌套循環 Join(效率低)
for left_row in left_table {
    for right_row in right_table {
        if left_row.join_key == right_row.join_key {
            output(combine(left_row, right_row));
        }
    }
}

// 時間複雜度: O(N × M)
// 如果左表 100 萬行,右表 100 萬行 → 需要 1 萬億次比較!

對於大型數據集,這種方法顯然不可行。Hash Join 提供了一個幾乎線性時間複雜度 O(N + M) 的解決方案。

Hash Join 的核心思想

Hash Join 的核心思想是:使用 Hash Table 來加速查找。基本流程如下:

階段 1 - Build Phase(構建階段):
  從較小的表(Build 側)構建一個 Hash Table
  Key = 連接鍵的值
  Value = 該行的數據(或索引)

階段 2 - Probe Phase(探測階段):
  遍歷較大的表(Probe 側)
  對每一行:
    1. 計算連接鍵的 hash 值
    2. 在 Hash Table 中查找匹配的行
    3. 產生結果行

讓我們通過一個具體例子來理解:

左表(users)- 選為 Build 側:
user_id | name
--------+-------
1       | Alice
2       | Bob
3       | Carol

右表(orders)- 選為 Probe 側:
order_id | user_id | amount
---------+---------+--------
101      | 1       | 100
102      | 2       | 200
103      | 1       | 150
104      | 3       | 300

JOIN users ON orders.user_id = users.user_id

Build Phase 詳細流程

步驟 1: 遍歷 users 表,構建 Hash Table

處理 Row 1: user_id=1, name=Alice
  hash(1) = 12345
  hash_table[12345] = { user_id: 1, name: "Alice" }

處理 Row 2: user_id=2, name=Bob
  hash(2) = 67890
  hash_table[67890] = { user_id: 2, name: "Bob" }

處理 Row 3: user_id=3, name=Carol
  hash(3) = 11111
  hash_table[11111] = { user_id: 3, name: "Carol" }

構建完成的 Hash Table:
┌──────────┬─────────────────────────┐
│ Hash值   │ 數據                     │
├──────────┼─────────────────────────┤
│ 12345    │ {user_id: 1, name: Alice}│
│ 67890    │ {user_id: 2, name: Bob}  │
│ 11111    │ {user_id: 3, name: Carol}│
└──────────┴─────────────────────────┘

Probe Phase 詳細流程

步驟 2: 遍歷 orders 表,查找匹配

處理 order 101: user_id=1, amount=100
  1. hash(1) = 12345
  2. 在 hash_table[12345] 找到: {user_id: 1, name: "Alice"}
  3. 驗證: 1 == 1 ✓ 匹配!
  4. 產生結果: {order_id: 101, user_id: 1, name: "Alice", amount: 100}

處理 order 102: user_id=2, amount=200
  1. hash(2) = 67890
  2. 在 hash_table[67890] 找到: {user_id: 2, name: "Bob"}
  3. 驗證: 2 == 2 ✓ 匹配!
  4. 產生結果: {order_id: 102, user_id: 2, name: "Bob", amount: 200}

處理 order 103: user_id=1, amount=150
  1. hash(1) = 12345
  2. 在 hash_table[12345] 找到: {user_id: 1, name: "Alice"}
  3. 驗證: 1 == 1 ✓ 匹配!
  4. 產生結果: {order_id: 103, user_id: 1, name: "Alice", amount: 150}

處理 order 104: user_id=3, amount=300
  1. hash(3) = 11111
  2. 在 hash_table[11111] 找到: {user_id: 3, name: "Carol"}
  3. 驗證: 3 == 3 ✓ 匹配!
  4. 產生結果: {order_id: 104, user_id: 3, name: "Carol", amount: 300}

最終結果:
order_id | user_id | name  | amount
---------+---------+-------+--------
101      | 1       | Alice | 100
102      | 2       | Bob   | 200
103      | 1       | Alice | 150
104      | 3       | Carol | 300

為何 Hash Join 如此高效?

時間複雜度分析:

設左表有 N 行,右表有 M 行

Nested Loop Join:
  - 需要 N × M 次比較
  - 時間複雜度: O(N × M)
  - 例如: 1000 × 10000 = 1000 萬次比較

Hash Join:
  - Build Phase: 遍歷 N 行,每行 O(1) 插入 → O(N)
  - Probe Phase: 遍歷 M 行,每行 O(1) 查找 → O(M)
  - 總時間複雜度: O(N + M)
  - 例如: 1000 + 10000 = 11000 次操作

性能提升: 1000 萬 / 11000 ≈ 909 倍!

關鍵優勢:

  1. 查找速度:Hash Table 提供 O(1) 平均查找時間
  2. 單次掃描:每個表只需要掃描一次
  3. 並行友好:Probe 階段可以輕鬆並行化
  4. 記憶體可控:只需將較小的表放入記憶體

DataFusion 中的 Hash Join 實現

HashJoinExec 的核心結構

讓我們看看 DataFusion 如何實現 Hash Join:

// datafusion/physical-plan/src/joins/hash_join/exec.rs
pub struct HashJoinExec {
    /// 左側(build 側)輸入
    pub left: Arc<dyn ExecutionPlan>,
    
    /// 右側(probe 側)輸入
    pub right: Arc<dyn ExecutionPlan>,
    
    /// 連接條件:(left_col, right_col) 對
    pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
    
    /// 額外的過濾條件(非等值條件)
    pub filter: Option<JoinFilter>,
    
    /// Join 類型(INNER、LEFT、RIGHT、FULL 等)
    pub join_type: JoinType,
    
    /// Build 階段的 Future(用於非同步構建 Hash Table)
    left_fut: Arc<OnceAsync<JoinLeftData>>,
    
    /// Hash 函數的隨機種子
    random_state: RandomState,
    
    /// 分區模式
    pub mode: PartitionMode,
    
    // ... 其他欄位
}

關鍵設計決策:

  1. left = build 側,right = probe 側:這是 DataFusion 的約定
  2. left_fut: Arc<OnceAsync>:使用 Future 非同步構建 Hash Table,多個 probe 執行緒可以共享同一個 build 結果
  3. on: Vec<(PhysicalExprRef, PhysicalExprRef)>:支援多欄位連接(如 ON a.col1 = b.col1 AND a.col2 = b.col2

JoinLeftData - Build 側的數據結構

Build 階段完成後,會產生 JoinLeftData 結構:

pub(super) struct JoinLeftData {
    /// Hash Table,存儲 hash 值 → 行索引的映射
    pub(super) hash_map: Box<dyn JoinHashMapType>,
    
    /// 連接後的單一 RecordBatch(包含所有 build 側數據)
    batch: RecordBatch,
    
    /// Build 側連接鍵的值(用於驗證)
    values: Vec<ArrayRef>,
    
    /// 用於追蹤哪些 build 側行已被匹配(用於 OUTER JOIN)
    visited_indices_bitmap: SharedBitmapBuilder,
    
    /// Probe 執行緒計數器
    probe_threads_counter: AtomicUsize,
    
    /// 記憶體保留(用於記憶體管理)
    _reservation: MemoryReservation,
    
    /// Build 側的邊界資訊(用於動態過濾優化)
    pub(super) bounds: Option<Vec<ColumnBounds>>,
}

重要概念:

  • 單一 RecordBatch:Build 階段會將所有輸入 batch 合併成一個大的 RecordBatch,簡化索引管理
  • visited_indices_bitmap:對於 LEFT/RIGHT/FULL OUTER JOIN,需要追蹤哪些行已被匹配,最後輸出未匹配的行
  • SharedBitmapBuilder:使用共享的 bitmap,允許多個並行 probe 執行緒同時更新

Build Phase 深入解析

Build Phase 的完整流程

Build Phase 是 Hash Join 的第一階段,負責從 build 側構建 Hash Table。讓我們深入理解這個過程:

// 簡化的 Build Phase 實作
async fn build_hash_table(
    build_input: SendableRecordBatchStream,
    on_columns: &[(PhysicalExprRef, PhysicalExprRef)],
    random_state: &RandomState,
) -> Result<JoinLeftData> {
    // 步驟 1: 收集所有 build 側的 RecordBatch
    let mut batches = Vec::new();
    let mut total_rows = 0;
    
    while let Some(batch) = build_input.next().await {
        let batch = batch?;
        total_rows += batch.num_rows();
        batches.push(batch);
    }
    
    // 步驟 2: 合併所有 batch 成單一 RecordBatch
    let merged_batch = concat_batches(&schema, &batches)?;
    
    // 步驟 3: 評估連接鍵表達式
    // 例如:如果 ON 條件是 left.id = right.id,則評估 left.id 欄位
    let join_key_values: Vec<ArrayRef> = on_columns
        .iter()
        .map(|(left_expr, _)| left_expr.evaluate(&merged_batch)?.into_array())
        .collect()?;
    
    // 步驟 4: 創建 Hash Table
    let mut hash_map = JoinHashMapU64::new();
    
    // 步驟 5: 計算每行的 hash 值並插入 Hash Table
    let mut hashes = vec![0u64; total_rows];
    create_hashes(&join_key_values, random_state, &mut hashes)?;
    
    // 步驟 6: 以反向順序插入(保持原始順序)
    for row_index in (0..total_rows).rev() {
        let hash_value = hashes[row_index];
        hash_map.insert(hash_value, row_index);
    }
    
    // 步驟 7: 創建 visited bitmap(用於 OUTER JOIN)
    let visited_bitmap = SharedBitmapBuilder::new(total_rows);
    
    // 步驟 8: 返回 JoinLeftData
    Ok(JoinLeftData::new(
        Box::new(hash_map),
        merged_batch,
        join_key_values,
        visited_bitmap,
        // ... 其他欄位
    ))
}

為何以反向順序插入 Hash Table?

這是 DataFusion 的一個巧妙設計。讓我們理解其原因:

假設有 3 個 build 側 batch:

Batch 1: Row 0, Row 1
Batch 2: Row 2, Row 3, Row 4  
Batch 3: Row 5, Row 6

合併後的單一 batch:
Index: 0    1    2    3    4    5    6
Row:   R0   R1   R2   R3   R4   R5   R6

如果連接鍵有重複值,例如 Row 0 和 Row 5 都是 user_id=1:

方法 A - 正向插入(0 → 6):
  insert(hash(1), 0)  → hash_table[hash(1)] = 0
  insert(hash(1), 5)  → hash_table[hash(1)] = 5  (覆蓋了 0)
  
  使用 LIFO(後進先出)結構,查找時會先找到 index 5,再找到 index 0
  結果順序: Row 5, Row 0  ❌ 順序顛倒!

方法 B - 反向插入(6 → 0):
  insert(hash(1), 5)  → hash_table[hash(1)] = 5
  insert(hash(1), 0)  → hash_table[hash(1)] = 0  (0 在 "top")
  
  查找時會先找到 index 0,再找到 index 5
  結果順序: Row 0, Row 5  ✓ 保持原始順序!

DataFusion 在註解中也說明了這一點:

// datafusion/physical-plan/src/joins/hash_join/exec.rs

/// Hash join uses LIFO data structure as a hash table, and in order to retain
/// original build-side input order while obtaining data during probe phase, hash
/// table is updated by iterating batch sequence in reverse order -- it allows to
/// keep rows with smaller indices "on the top" of hash table, and still maintain
/// correct indexing for concatenated build-side data batch.

Hash Table 的實際結構

DataFusion 支援兩種 Hash Table 實現:

pub trait JoinHashMapType {
    /// 插入一個 hash 值和對應的行索引
    fn insert(&mut self, hash: u64, row_index: usize);
    
    /// 查找匹配的行索引
    fn get(&self, hash: u64) -> Option<impl Iterator<Item = usize>>;
    
    /// 返回總行數
    fn size(&self) -> usize;
}

// 實現 1: 使用 u32 索引(適合較小的表,< 42 億行)
pub struct JoinHashMapU32 {
    // 使用 HashMap<u64, SmallVec<[u32; 1]>>
    // SmallVec 優化:如果只有 1 個匹配,不需要 heap 分配
}

// 實現 2: 使用 u64 索引(適合超大表)
pub struct JoinHashMapU64 {
    // 使用 HashMap<u64, SmallVec<[u64; 1]>>
}

SmallVec 優化:

大部分情況下,每個 hash 值只對應 1 個行索引:
  hash(12345) → [42]          // 單一值,存儲在棧上
  
少數情況下,可能有多個匹配(連接鍵有重複):
  hash(67890) → [10, 25, 38] // 多個值,需要 heap 分配

SmallVec<[u32; 1]> 的優勢:
  - 單一值時:零 heap 分配,快速
  - 多個值時:自動擴展到 heap
  - 節省記憶體和提升性能

Build Phase 的性能考量

記憶體需求:

假設 build 側有 1000 萬行,每行平均 100 bytes:

1. 原始數據: 10M × 100 bytes = 1 GB
2. 合併後的 RecordBatch: 1 GB(Arrow 列式格式,高效)
3. Hash Table: 
   - 10M 個 hash 值(u64): 10M × 8 = 80 MB
   - 10M 個索引(u32): 10M × 4 = 40 MB
   - HashMap 開銷(約 1.5x): 180 MB
4. 連接鍵 values: 取決於類型,假設 80 MB

總記憶體: 1 GB + 180 MB + 80 MB ≈ 1.26 GB

結論: Hash Table 的開銷相對較小(約 20%)

時間開銷:

操作分解:
1. 收集和合併 batch: O(N),主要是記憶體拷貝
2. 評估連接鍵表達式: O(N),通常很快(簡單欄位引用)
3. 計算 hash 值: O(N),使用高效的 hash 函數(ahash)
4. 插入 Hash Table: O(N) 平均,每次插入 O(1)

總時間: O(N),線性於 build 側的行數

Probe Phase 深入解析

Probe Phase 的完整流程

Probe Phase 是 Hash Join 的第二階段,負責遍歷 probe 側數據並在 Hash Table 中查找匹配:

// 簡化的 Probe Phase 實作
async fn probe_hash_table(
    probe_input: SendableRecordBatchStream,
    join_left_data: Arc<JoinLeftData>,
    on_columns: &[(PhysicalExprRef, PhysicalExprRef)],
    join_type: JoinType,
) -> Result<Vec<RecordBatch>> {
    let mut output_batches = Vec::new();
    
    // 遍歷 probe 側的每個 batch
    while let Some(probe_batch) = probe_input.next().await {
        let probe_batch = probe_batch?;
        let num_rows = probe_batch.num_rows();
        
        // 步驟 1: 評估 probe 側的連接鍵
        let probe_join_keys: Vec<ArrayRef> = on_columns
            .iter()
            .map(|(_, right_expr)| right_expr.evaluate(&probe_batch)?.into_array())
            .collect()?;
        
        // 步驟 2: 計算 probe 側的 hash 值
        let mut probe_hashes = vec![0u64; num_rows];
        create_hashes(&probe_join_keys, &random_state, &mut probe_hashes)?;
        
        // 步驟 3: 查找匹配並記錄索引
        let mut build_indices = Vec::new();  // 匹配的 build 側索引
        let mut probe_indices = Vec::new();  // 匹配的 probe 側索引
        
        for probe_idx in 0..num_rows {
            let hash_value = probe_hashes[probe_idx];
            
            // 在 Hash Table 中查找匹配的 build 側行
            if let Some(build_idx_iter) = join_left_data.hash_map().get(hash_value) {
                for build_idx in build_idx_iter {
                    // 步驟 4: 驗證連接鍵是否真正相等(處理 hash 衝突)
                    if equal_join_keys(
                        &probe_join_keys, 
                        probe_idx,
                        join_left_data.values(), 
                        build_idx
                    )? {
                        // 找到匹配!
                        build_indices.push(build_idx);
                        probe_indices.push(probe_idx);
                        
                        // 標記 build 側行已被訪問(用於 OUTER JOIN)
                        join_left_data.visited_indices_bitmap()
                            .set_bit(build_idx);
                    }
                }
            }
        }
        
        // 步驟 5: 根據 join 類型調整索引
        let (final_build_indices, final_probe_indices) = 
            adjust_indices_by_join_type(
                build_indices,
                probe_indices,
                num_rows,
                join_left_data.batch().num_rows(),
                join_type,
            )?;
        
        // 步驟 6: 使用索引構建輸出 batch
        let output_batch = build_batch_from_indices(
            join_left_data.batch(),
            &probe_batch,
            &final_build_indices,
            &final_probe_indices,
            &column_indices,
        )?;
        
        output_batches.push(output_batch);
    }
    
    Ok(output_batches)
}

匹配查找的詳細過程

讓我們通過一個具體例子來理解 Probe Phase:

Build 側(已構建 Hash Table):
Index | user_id | name
------+---------+-------
0     | 1       | Alice
1     | 2       | Bob
2     | 1       | Alice2  (重複的 user_id)
3     | 3       | Carol

Hash Table 內容(簡化表示):
hash(1) → [2, 0]  // LIFO: 反向插入,所以 0 在頂部
hash(2) → [1]
hash(3) → [3]

Probe 側 batch:
Index | order_id | user_id | amount
------+----------+---------+--------
0     | 101      | 1       | 100
1     | 102      | 2       | 200
2     | 103      | 1       | 150
3     | 104      | 4       | 300

處理過程:

處理 probe_idx=0 (order_id=101, user_id=1, amount=100):
  1. hash(1) = 12345
  2. 查找 hash_table[12345] → 找到 [2, 0]
  3. 檢查 build_idx=2:
     - 比較: probe[0].user_id (1) == build[2].user_id (1) ✓
     - 記錄匹配: (build_idx=2, probe_idx=0)
  4. 檢查 build_idx=0:
     - 比較: probe[0].user_id (1) == build[0].user_id (1) ✓
     - 記錄匹配: (build_idx=0, probe_idx=0)
  
  結果: 產生 2 個輸出行(笛卡爾積)

處理 probe_idx=1 (order_id=102, user_id=2, amount=200):
  1. hash(2) = 67890
  2. 查找 hash_table[67890] → 找到 [1]
  3. 檢查 build_idx=1:
     - 比較: probe[1].user_id (2) == build[1].user_id (2) ✓
     - 記錄匹配: (build_idx=1, probe_idx=1)
  
  結果: 產生 1 個輸出行

處理 probe_idx=2 (order_id=103, user_id=1, amount=150):
  1. hash(1) = 12345
  2. 查找 hash_table[12345] → 找到 [2, 0]
  3. 檢查 build_idx=2:
     - 比較: probe[2].user_id (1) == build[2].user_id (1) ✓
     - 記錄匹配: (build_idx=2, probe_idx=2)
  4. 檢查 build_idx=0:
     - 比較: probe[2].user_id (1) == build[0].user_id (1) ✓
     - 記錄匹配: (build_idx=0, probe_idx=2)
  
  結果: 產生 2 個輸出行

處理 probe_idx=3 (order_id=104, user_id=4, amount=300):
  1. hash(4) = 99999
  2. 查找 hash_table[99999] → 未找到
  3. 無匹配
  
  結果: 對於 INNER JOIN,不產生輸出;對於 LEFT JOIN,產生 NULL 填充行

最終 build_indices = [2, 0, 1, 2, 0]
最終 probe_indices = [0, 0, 1, 2, 2]

構建輸出 batch:
order_id | user_id | name   | amount
---------+---------+--------+--------
101      | 1       | Alice2 | 100    (build[2], probe[0])
101      | 1       | Alice  | 100    (build[0], probe[0])
102      | 2       | Bob    | 200    (build[1], probe[1])
103      | 1       | Alice2 | 150    (build[2], probe[2])
103      | 1       | Alice  | 150    (build[0], probe[2])

Hash 衝突的處理

Hash 衝突是指不同的連接鍵值產生相同的 hash 值:

範例情況:
  hash(user_id=1) = 12345
  hash(user_id=999) = 12345  // 衝突!

Hash Table:
  hash_table[12345] → [build_idx for user_id=1, build_idx for user_id=999]

Probe 時處理:
  當 probe user_id=1 時:
    1. hash(1) = 12345
    2. 查找 hash_table[12345] → 找到多個候選
    3. **必須逐一驗證連接鍵值**:
       - build[x].user_id == 1? ✓ 真正匹配
       - build[y].user_id == 999? ✗ Hash 衝突,跳過

這就是為何 Probe 階段不能只依賴 hash 值,必須明確比較連接鍵的實際值

Probe Phase 的並行化

Probe Phase 天然適合並行化:

場景: probe 側有 4 個分區

分區 0 (獨立執行緒 0):         ┐
  probe_batch_0 → 查找 hash_table → output_batch_0
                                  │
分區 1 (獨立執行緒 1):         │  所有執行緒共享
  probe_batch_1 → 查找 hash_table → output_batch_1   同一個 Hash Table
                                  │
分區 2 (獨立執行緒 2):         │  (只讀操作,無競爭)
  probe_batch_2 → 查找 hash_table → output_batch_2
                                  │
分區 3 (獨立執行緒 3):         ┘
  probe_batch_3 → 查找 hash_table → output_batch_3

優勢:
  - Hash Table 構建一次,多個執行緒共享(節省記憶體)
  - Probe 操作只讀,無需鎖(高並行度)
  - 線性擴展性: 4 個核心 ≈ 4 倍速度

注意事項:

對於需要追蹤 visited bitmap 的 OUTER JOIN,需要原子操作來更新 bitmap,但這個開銷很小:

// 使用原子操作設置 bitmap
join_left_data.visited_indices_bitmap().set_bit(build_idx);

// 內部使用 atomic compare-and-swap
fn set_bit(&self, index: usize) {
    let byte_index = index / 8;
    let bit_mask = 1 << (index % 8);
    
    // 原子操作,多執行緒安全
    self.bitmap[byte_index].fetch_or(bit_mask, Ordering::Relaxed);
}

Build 側與 Probe 側的選擇

為何選擇很重要?

Build 側的選擇對性能有巨大影響:

場景: 小表 (1000 行) JOIN 大表 (100 萬行)

選擇 A - 小表做 Build,大表做 Probe:
  Build Phase:  1000 行 → Hash Table 占用 ~10 KB
  Probe Phase:  100 萬行 × O(1) 查找 = 100 萬次操作
  總記憶體:    10 KB
  總時間:      約 100 ms

選擇 B - 大表做 Build,小表做 Probe:
  Build Phase:  100 萬行 → Hash Table 占用 ~10 MB
  Probe Phase:  1000 行 × O(1) 查找 = 1000 次操作
  總記憶體:    10 MB  (1000 倍差異!)
  總時間:      約 200 ms

結論: 應該選擇小表做 Build 側

DataFusion 的選擇策略

DataFusion 的物理規劃器會在生成執行計劃時選擇 Build 側:

// 簡化的選擇邏輯
fn should_swap_join_inputs(
    left_stats: &Statistics,
    right_stats: &Statistics,
    join_type: JoinType,
) -> bool {
    // 規則 1: 某些 join 類型不能交換
    if !join_type.supports_swap() {
        return false;
    }
    
    // 規則 2: 如果有明確的行數統計,選擇較小的做 build
    if let (Some(left_rows), Some(right_rows)) = 
        (left_stats.num_rows, right_stats.num_rows) 
    {
        return left_rows > right_rows; // 如果左側更大,交換
    }
    
    // 規則 3: 使用總字節數估計
    if let (Some(left_bytes), Some(right_bytes)) = 
        (left_stats.total_byte_size, right_stats.total_byte_size) 
    {
        return left_bytes > right_bytes;
    }
    
    // 規則 4: 無統計資訊時,保持原樣
    false
}

實際案例:

-- 原始查詢
SELECT *
FROM large_orders o
JOIN small_users u ON o.user_id = u.user_id;

-- 優化器決策:
-- large_orders: 1000 萬行,1 GB
-- small_users:  10 萬行,10 MB

-- 物理計劃(可能):
HashJoinExec
  ├─ small_users  (Build 側)
  └─ large_orders (Probe 側)

-- 如果優化器判斷 large_orders 更小,會自動交換:
HashJoinExec
  ├─ large_orders (Build 側)
  └─ small_users  (Probe 側)

Join 類型對選擇的限制

不是所有 Join 類型都能自由交換 Build 和 Probe 側:

INNER JOIN:
  A JOIN B == B JOIN A
  ✓ 可以交換

LEFT OUTER JOIN:
  A LEFT JOIN B != B LEFT JOIN A
  ✗ 不能直接交換
  但可以轉換: A LEFT JOIN B == B RIGHT JOIN A
  ✓ 可以通過改變 Join 類型來交換

RIGHT OUTER JOIN:
  A RIGHT JOIN B == B LEFT JOIN A
  ✓ 可以通過改變 Join 類型來交換

FULL OUTER JOIN:
  A FULL JOIN B == B FULL JOIN A
  ✓ 可以交換

SEMI/ANTI JOIN:
  有特定的語義,需要謹慎處理

不同 Join 類型的處理

INNER JOIN

INNER JOIN 只輸出兩側都有匹配的行:

Build 側:                    Probe 側:
id | name                    id | city
---+-------                  ---+----------
1  | Alice                   1  | Taipei
2  | Bob                     2  | Taichung
3  | Carol                   4  | Kaohsiung

INNER JOIN 處理:
  Probe id=1: 在 Hash Table 找到 → 輸出 (1, Alice, Taipei)
  Probe id=2: 在 Hash Table 找到 → 輸出 (2, Bob, Taichung)
  Probe id=4: 在 Hash Table 未找到 → 不輸出(跳過)

結果:
id | name  | city
---+-------+----------
1  | Alice | Taipei
2  | Bob   | Taichung

visited_indices_bitmap: [true, true, false]
// Build 側的 id=3 未被匹配

實作要點:

  • 只記錄找到匹配的 (build_idx, probe_idx) 對
  • 不需要處理未匹配的行
  • visited_indices_bitmap 不影響輸出(但可用於統計)

LEFT OUTER JOIN

LEFT OUTER JOIN 輸出所有 probe 側的行,即使沒有匹配:

Build 側:                    Probe 側:
id | name                    id | city
---+-------                  ---+----------
1  | Alice                   1  | Taipei
2  | Bob                     2  | Taichung
                              4  | Kaohsiung

LEFT OUTER JOIN 處理:
  Probe id=1: 在 Hash Table 找到 → 輸出 (1, Taipei, Alice)
  Probe id=2: 在 Hash Table 找到 → 輸出 (2, Taichung, Bob)
  Probe id=4: 在 Hash Table 未找到 → 輸出 (4, Kaohsiung, NULL)

結果:
id | city      | name
---+-----------+-------
1  | Taipei    | Alice
2  | Taichung  | Bob
4  | Kaohsiung | NULL

// 注意: probe 側的 id=4 雖然沒有匹配,仍然輸出

實作要點:

  • 對於沒有匹配的 probe 行,build 側欄位填充 NULL
  • 需要追蹤哪些 probe 行已找到匹配
  • 單階段處理即可完成

RIGHT OUTER JOIN

RIGHT OUTER JOIN 輸出所有 build 側的行,即使沒有匹配:

Build 側:                    Probe 側:
id | name                    id | city
---+-------                  ---+----------
1  | Alice                   1  | Taipei
2  | Bob                     4  | Kaohsiung
3  | Carol

RIGHT OUTER JOIN 處理:
  Probe Phase:
    Probe id=1: 找到匹配 → 輸出 (1, Alice, Taipei)
    Probe id=4: 未找到匹配 → 跳過
  
  Final Phase(處理未匹配的 build 側行):
    Build id=2: visited_bitmap[1] = true → 已匹配,跳過
    Build id=3: visited_bitmap[2] = false → 未匹配,輸出 (3, Carol, NULL)

結果:
id | name  | city
---+-------+----------
1  | Alice | Taipei
2  | Bob   | NULL     (未在 probe 側找到)
3  | Carol | NULL     (未在 probe 側找到)

// 注意: 需要在 Probe 結束後,額外輸出未匹配的 build 行

實作要點:

  • Probe Phase: 正常處理,同時更新 visited_indices_bitmap
  • Final Phase: 遍歷 visited_indices_bitmap,輸出未訪問的 build 側行(probe 欄位填充 NULL)
  • 需要兩個階段

FULL OUTER JOIN

FULL OUTER JOIN 輸出兩側的所有行,無論是否匹配:

Build 側:                    Probe 側:
id | name                    id | city
---+-------                  ---+----------
1  | Alice                   1  | Taipei
2  | Bob                     4  | Kaohsiung
3  | Carol

FULL OUTER JOIN 處理:
  Probe Phase:
    Probe id=1: 找到匹配 → 輸出 (1, Alice, Taipei)
    Probe id=4: 未找到 → 輸出 (4, NULL, Kaohsiung)
  
  Final Phase(處理未匹配的 build 側行):
    Build id=2: visited_bitmap[1] = false → 輸出 (2, Bob, NULL)
    Build id=3: visited_bitmap[2] = false → 輸出 (3, Carol, NULL)

結果:
id | name  | city
---+-------+----------
1  | Alice | Taipei
2  | Bob   | NULL
3  | Carol | NULL
4  | NULL  | Kaohsiung

實作要點:

  • 結合 LEFT OUTER 和 RIGHT OUTER 的邏輯
  • Probe Phase: 未匹配的 probe 行也輸出(build 側填充 NULL)
  • Final Phase: 輸出未匹配的 build 側行(probe 側填充 NULL)

原始碼實作

// datafusion/physical-plan/src/joins/utils.rs

pub fn adjust_indices_by_join_type(
    mut build_indices: Vec<usize>,
    mut probe_indices: Vec<usize>,
    probe_batch_size: usize,
    build_batch_size: usize,
    join_type: JoinType,
) -> Result<(UInt64Array, UInt32Array)> {
    match join_type {
        JoinType::Inner => {
            // 直接返回找到匹配的索引對
            // 無需額外處理
        }
        JoinType::Left => {
            // 檢查每個 probe 行是否有匹配
            let mut matched_probe = vec![false; probe_batch_size];
            for &probe_idx in &probe_indices {
                matched_probe[probe_idx] = true;
            }
            
            // 為未匹配的 probe 行添加 (NULL, probe_idx)
            for probe_idx in 0..probe_batch_size {
                if !matched_probe[probe_idx] {
                    build_indices.push(NULL_INDEX); // 特殊值表示 NULL
                    probe_indices.push(probe_idx);
                }
            }
        }
        JoinType::Right => {
            // Probe Phase 正常處理
            // Final Phase 由調用方處理 visited_indices_bitmap
        }
        JoinType::Full => {
            // 結合 Left 和 Right 的邏輯
            // ...
        }
        // 其他 Join 類型...
    }
    
    Ok((
        UInt64Array::from(build_indices),
        UInt32Array::from(probe_indices),
    ))
}

性能考量與優化

記憶體管理

Hash Join 的記憶體使用主要來自 Build 側:

記憶體組成:
1. Build 側合併 RecordBatch: 取決於數據大小
2. Hash Table: 約為 Build 側數據的 15-20%
3. Visited Bitmap (OUTER JOIN): 每行 1 bit

優化策略:
- 使用較小的表做 Build 側(優化器自動選擇)
- 使用 u32 索引替代 u64(適用於 < 42 億行)
- 使用 SmallVec 減少小型匹配列表的 heap 分配
- 啟用記憶體限制,防止 OOM

當記憶體不足時

如果 Build 側太大無法放入記憶體,有幾種策略:

策略 1: Grace Hash Join(優雅 Hash Join)

將兩個輸入表分區(partition):
1. 按連接鍵的 hash 值將兩個表分成 N 個分區
2. 對每個分區獨立執行 Hash Join
3. 每個分區可以放入記憶體

好處: 即使總數據很大,每個分區都很小

策略 2: Hybrid Hash Join(混合 Hash Join)

部分分區在記憶體中,部分在磁碟上:
1. 盡可能多的分區放在記憶體中處理
2. 其餘分區 spill 到磁碟
3. 第二輪從磁碟讀取並處理

好處: 充分利用可用記憶體,減少磁碟 I/O

DataFusion 當前主要使用第一種策略,通過 PartitionMode 控制:

pub enum PartitionMode {
    /// 收集整個 build 側到單一分區(適合小表)
    CollectLeft,
    
    /// 兩側都分區(適合大表)
    Partitioned,
    
    /// 自動選擇
    Auto,
}

並行化與分區

CollectLeft 模式:

Build 側(單一分區):          Probe 側(多分區):
     ┌─────┐                  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
     │All  │                  │ P0  │ │ P1  │ │ P2  │ │ P3  │
     │Data │                  └─────┘ └─────┘ └─────┘ └─────┘
     └─────┘                     │       │       │       │
        │                        ▼       ▼       ▼       ▼
        │                     ┌─────────────────────────────┐
        └────────────────────►│   Shared Hash Table         │
                              │   (由所有 probe 執行緒共享)  │
                              └─────────────────────────────┘

優勢: Probe 階段完全並行,無資料交換
適用: Build 側較小(可放入記憶體)

Partitioned 模式:

Build 側(分區):              Probe 側(分區):
┌─────┐ ┌─────┐ ┌─────┐     ┌─────┐ ┌─────┐ ┌─────┐
│ P0  │ │ P1  │ │ P2  │     │ P0  │ │ P1  │ │ P2  │
└─────┘ └─────┘ └─────┘     └─────┘ └─────┘ └─────┘
   │       │       │            │       │       │
   ▼       ▼       ▼            ▼       ▼       ▼
┌─────┐ ┌─────┐ ┌─────┐     (相同 partition 編號的
│Hash0│ │Hash1│ │Hash2│      數據一起處理)
└─────┘ └─────┘ └─────┘

步驟:
1. 按連接鍵 hash 重分區兩個輸入
2. 每個分區獨立執行 Hash Join
3. 無需全局同步

優勢: 適合大型 Build 側,每個分區獨立
適用: 兩個輸入都很大

統計資訊的重要性

準確的統計資訊對 Hash Join 性能很重要:

-- 場景 1: 有準確統計
-- 優化器知道 users 只有 10 萬行,orders 有 1000 萬行
SELECT * FROM orders o JOIN users u ON o.user_id = u.user_id;

-- 物理計劃:
HashJoinExec
  ├─ users (Build 側, 10 萬行)  ← 正確選擇
  └─ orders (Probe 側, 1000 萬行)

-- 場景 2: 沒有統計(或統計過時)
-- 優化器無法判斷哪個表更小
SELECT * FROM orders o JOIN users u ON o.user_id = u.user_id;

-- 物理計劃(可能):
HashJoinExec
  ├─ orders (Build 側, 1000 萬行)  ← 錯誤選擇!記憶體暴增
  └─ users (Probe 側, 10 萬行)

解決方法:
-- 更新統計資訊
ANALYZE TABLE orders;
ANALYZE TABLE users;

小結

今天我們深入探討了 Hash Join 算子的原理和實現:

  1. 兩階段執行模型

    • Build Phase:構建 Hash Table(O(N))
    • Probe Phase:查找匹配(O(M))
    • 總時間複雜度:O(N + M),遠優於 Nested Loop 的 O(N × M)
  2. Build 側與 Probe 側的選擇

    • 應該將較小的表作為 Build 側
    • DataFusion 優化器基於統計資訊自動選擇
    • 錯誤的選擇會導致記憶體暴增和性能下降
  3. Hash Table 的設計

    • 使用 HashMap<hash_value, Vec<row_index>> 結構
    • 反向順序插入保持原始順序(LIFO 特性)
    • SmallVec 優化減少記憶體分配
    • 必須處理 Hash 衝突,驗證實際鍵值

DataFusion 的實現亮點

  1. 非同步 Build Phase:使用 OnceAsync<JoinLeftData> 允許多個 probe 執行緒共享同一個 Hash Table
  2. 並行 Probe:Probe 階段完全並行,線性擴展到多核
  3. 記憶體管理:使用 MemoryReservation 追蹤記憶體使用,支援 Spilling
  4. 分區策略CollectLeft 適合小表,Partitioned 適合大表
  5. 統計驅動優化:基於表統計自動選擇最優的 Build 側

性能關鍵因素

  1. 正確的 Build 側選擇:影響記憶體使用和性能
  2. 準確的統計資訊:幫助優化器做出正確決策
  3. 合適的分區模式:根據數據大小選擇 CollectLeft 或 Partitioned
  4. Hash 函數質量:好的 hash 函數減少衝突,提升性能

Hash Join 是實現高效 Join 操作的基礎。然而,它並非萬能——在某些場景下(如輸入已排序、記憶體嚴重受限),Sort-Merge Join 可能是更好的選擇。因此明天我們將探討另一種重要的 Join 算法 -- Sort-Merge Join,並學習優化器如何在不同 Join 策略之間做出選擇。

參考資料

  1. DataFusion HashJoinExec 原始碼
  2. HashJoinStream 實現
  3. Join Utilities
  4. JoinHashMap Trait
  5. Hash Joins in Modern Databases
  6. Efficient Hash Joins on Modern Processors
  7. DataFusion Physical Optimizer
  8. PostgreSQL Hash Join Implementation
  9. Grace Hash Join
  10. Apache Arrow 列式記憶體格式

上一篇
Day 20: 聚合算子 Part 2 - Hash vs Sort Aggregate
下一篇
Day 22: Join 算子 Part 2 - Sort-Merge Join 和策略選擇
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言